package com.xiaofan

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector


case class ReceiptEvent(txId: String, payChannel: String, timestamp: Long)


object TxMatch {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream1: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
    val orderEventStream: DataStream[OrderEvent] = inputStream1
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          OrderEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .filter(_.eventType == "pay")
      .keyBy(_.txId)

    val inputStream2: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv")
    val receiptEventStream: DataStream[ReceiptEvent] = inputStream2
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          ReceiptEvent(arr(0), arr(1), arr(2).toLong)
        }
      }
      .assignAscendingTimestamps(_.timestamp * 1000L)
      .keyBy(_.txId)

    // 3. 合并两条流
    val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream.connect(receiptEventStream)
      .process(new TxPayMatchResult())

    resultStream.print("matched")

    resultStream.getSideOutput(new OutputTag[OrderEvent]("unmatched-pay")).print("unmatched-pay")
    resultStream.getSideOutput(new OutputTag[ReceiptEvent]("unmatched-receipt")).print("unmatched-receipt")

    env.execute("tx job test")

  }

}

class TxPayMatchResult() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {

  // 定义状态，保存当前交易对应的订单支付事件和到账事件
  lazy val payEventState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
  lazy val receiptEventState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))

  val unMatchPayEventOutputTag = new OutputTag[OrderEvent]("unmatched-pay")
  val unMatchReceiptEventOutputTag = new OutputTag[ReceiptEvent]("unmatched-receipt")

  override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // 订单支付来了，要判断之前是否有到账时间
    val receipt: ReceiptEvent = receiptEventState.value()

    if (receipt != null) {
      out.collect((pay, receipt))
      payEventState.clear()
      receiptEventState.clear()
    } else {
      // 这里定时器的时间要看对方数据的数据情况
      ctx.timerService().registerEventTimeTimer(pay.timestamp * 1000L + 5000L)
      // 更新状态
      payEventState.update(pay)
    }
  }

  override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // 账单来了，要判断之前是否有订单支付
    val pay: OrderEvent = payEventState.value()

    if (pay != null) {
      out.collect((pay, receipt))
      payEventState.clear()
      receiptEventState.clear()
    } else {
      // 这里定时器的时间要看对方数据的数据情况
      ctx.timerService().registerEventTimeTimer(receipt.timestamp * 1000L + 3000L)
      // 更新状态
      receiptEventState.update(receipt)
    }
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // 定时器触发，判断状态中哪一个还存在，就代表另一个没有来
    if (payEventState.value() != null) {
      ctx.output(unMatchPayEventOutputTag, payEventState.value())
    }

    if (receiptEventState.value() != null) {
      ctx.output(unMatchReceiptEventOutputTag, receiptEventState.value())
    }

    payEventState.clear()
    receiptEventState.clear()
  }
}
